共计 2914 个字符,预计需要花费 8 分钟才能阅读完成。
一. Docker 安装
1. 下载
# 下载 zookeeper 镜像
docker pull wurstmeister/zookeeper
# 下载 kafka 镜像
docker pull wurstmeister/kafka
2. 启动
- 方式一 : docker 启动
# 启动 zookeeper
docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper
# 启动 kafka
docker run -d --name kafka3 --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
- 方式二 : docker-compose 启动
# docker-compose.yml
version: '2'
services:
zookeeper:
image: "wurstmeister/zookeeper"
hostname: "zookeeper.local"
container_name: "zookeeper"
networks:
local:
aliases:
- "zookeeper.local"
kafka:
image: "wurstmeister/kafka"
hostname: "kafka.local"
container_name: "kafka"
ports:
- "9092:9092"
networks:
local:
aliases:
- "kafka.local"
environment:
KAFKA_ADVERTISED_HOST_NAME: 10.64.66.118 # 对外开放的 ip
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
networks:
local:
driver: bridge
# 简便写法
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
volumes:
- ./data:/data
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 10.64.66.118
KAFKA_MESSAGE_MAX_BYTES: 2000000
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
local:
driver: bridge
启动命令 :
docker-compose up -d
二. 终端测试
1. 打开两个命令框,都进入 kafka 容器内
docker exec -it kafka bash
2. 一个命令框测试创建 topic 并订阅
# 连接 zookeeper,创建一个 topic
/opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 -replication-factor 1 --partitions 1 --topic test01
# 查看 topic
/opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --list
# 消费者订阅该 topic
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka.local:9092 --topic test01 --from-beginning
第一条命令报错:
Error while fetching metadata with correl
原因:无法连接
zookeeper
, 重启该容器即可
3. 另一个终端测试生产数据
# 生产者
/opt/kafka/bin/kafka-console-producer.sh --broker-list kafka.local:9092 --topic test01
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic "RCU_TDM_TOPIC"
测试:
- 在生产者终端不断发消息,在消费者终端可以收到消息
三.Python 操作 Kafka 测试
1. 测试
- server.py
from kafka import KafkaProducer
from kafka.errors import KafkaError
import time
class KafkaPythonDemo(object):
def __init__(self):
self.producer = KafkaProducer(bootstrap_servers=["10.64.66.118:9092"])
self.topic = 'test1'
def producerTest(self):
print('------ producer begins ------')
n = 1
try:
while (n <= 100):
self.producer.send(self.topic, str(n).encode())
print(f'send {str(n)}')
n += 1
time.sleep(0.5)
except KafkaError as e:
print(e)
finally:
self.producer.close()
print('======= 生产完毕 =========')
if __name__ == "__main__":
kafkaPythonDemo = KafkaPythonDemo()
kafkaPythonDemo.producerTest()
- client.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('test1', bootstrap_servers='10.64.66.118:9092')
for msg in consumer:
print(msg.value.decode())
2. 对外开放连接配置问题
-
kafka 在运行在虚拟机容器内
-
映射 9092 端口
在虚拟机内可以连接 kafka, 在 windows 宿主机上无法连接
- 解决:修改
server.properties
配置文件
advertised.listeners=PLAINTEXT://10.64.66.118:9092 # 对外地址
正文完